package provider

/**
 ** TCPServiceManage：TCP监听服务管理，有连接到来时创建出TCPService与之关联；
 ** TCPService：单个连接TCP服务运行实体;
*/

import (
	. "cppcloud"
	"encoding/json"
	"fmt"
	"net"
	"reflect"
	"sync"
)

type TCPServiceManage struct {
	svrAddr        string // 监听地址： 192.168.1.100:3000
	sockListen     net.Listener
	waitGroup      sync.WaitGroup
	stat           MsgStat              // 流量统计
	cmdMapping     map[uint16]TCPServCB // 接收到的消息处理函数
	resultCallBack func(bool)

	conns     map[*TCPService]int8 // 存活的连接服务对象
	connsLock sync.Mutex
	exitfg    bool
}

func createTCPServiceManage(listenAddr string) (tcpsvrMgr *TCPServiceManage, err error) {
	listner, e := net.Listen("tcp", listenAddr)
	if err = e; nil == e {
		tcpsvrMgr = &TCPServiceManage{svrAddr: listenAddr, sockListen: listner}
		tcpsvrMgr.cmdMapping = make(map[uint16]TCPServCB)
		tcpsvrMgr.conns = make(map[*TCPService]int8)
	}

	return
}

// RegisteTCPCallback 设置TCP服务回调处理函数
func (tcpsvrMgr *TCPServiceManage) RegisteTCPCallback(cmdid uint16, cb TCPServCB) {
	tcpsvrMgr.cmdMapping[cmdid] = cb
}

func (tcpsvrMgr *TCPServiceManage) Join() {
	tcpsvrMgr.waitGroup.Wait()
}

func (tcpsvrMgr *TCPServiceManage) Shutdown() {
	tcpsvrMgr.exitfg = true
	if nil != tcpsvrMgr.sockListen {
		tcpsvrMgr.sockListen.Close()
		tcpsvrMgr.sockListen = nil
	}

	tcpsvrMgr.connsLock.Lock()
	defer tcpsvrMgr.connsLock.Unlock()
	for tcpserv, _ := range tcpsvrMgr.conns {
		tcpserv.shutdown() // 通知各个连接关闭并退出
	}
}

func (tcpsvrMgr *TCPServiceManage) run() {

	defer func() {
		if nil != tcpsvrMgr.sockListen {
			tcpsvrMgr.sockListen.Close()
			tcpsvrMgr.sockListen = nil
		}
		tcpsvrMgr.waitGroup.Done()
	}()

	for !tcpsvrMgr.exitfg {
		conn, err := tcpsvrMgr.sockListen.Accept()
		if nil != err {
			break
		}

		tcpser := &TCPService{
			mgr:        tcpsvrMgr,
			cli:        conn,
			sndQ:       make(chan *MessageData, 100),
			cmdMapping: tcpsvrMgr.cmdMapping,
			stat:       &tcpsvrMgr.stat,
			//exitfg:     &tcpsvrMgr.exitfg,
			waitGroup: &tcpsvrMgr.waitGroup,
		}

		tcpsvrMgr.addSvrConn(tcpser, 2)
		go tcpser.sendRoutine()
		go tcpser.recvRoutine()
	}
}

func (tcpsvrMgr *TCPServiceManage) addSvrConn(tcpser *TCPService, delta int8) {
	tcpsvrMgr.connsLock.Lock()
	defer tcpsvrMgr.connsLock.Unlock()

	cnt, ok := tcpsvrMgr.conns[tcpser]
	if delta > 0 {
		if ok {
			tcpsvrMgr.conns[tcpser] = cnt + delta
		} else {
			tcpsvrMgr.conns[tcpser] = delta
		}
	} else {
		if ok {
			cnt += delta
			if cnt <= 0 {
				delete(tcpsvrMgr.conns, tcpser)
			} else {
				tcpsvrMgr.conns[tcpser] = cnt
			}
		}
	}
}

/**
 ** 单个连接TCP服务运行实体
 */
type TCPService struct {
	mgr *TCPServiceManage
	cli net.Conn
	err error

	sndQ       chan *MessageData    // 响应的消息队列
	cmdMapping map[uint16]TCPServCB // 接收到的消息处理函数

	stat      *MsgStat // 流量统计
	exitfg    bool
	waitGroup *sync.WaitGroup
}

type ResponseHelper struct {
	cmdid uint16
	seqid uint16
	sndQ  chan *MessageData

	resultCallBack func(bool)
}

// 发送响应的方法 (支持异步返回)
func (helper *ResponseHelper) SendMessage(msg interface{}) {
	helper.sndQ <- &MessageData{
		Cmdid: (CMDID_MID | helper.cmdid),
		Seqid: helper.seqid,
		Body:  msg,
	}

}

// 反馈处理结果
func (helper *ResponseHelper) SetResult(isOk bool) {
	helper.resultCallBack(isOk)
}

// TCP服务处理回调函数原型
type TCPServCB func(cmdid uint16, msg string, helper *ResponseHelper)

func (tcpser *TCPService) recvRoutine() {
	defer func() {
		tcpser.mgr.addSvrConn(tcpser, -1)
		if nil != tcpser.cli {
			tcpser.cli.Close()
			tcpser.cli = nil
		}
		tcpser.waitGroup.Done()
	}()

	tcpser.waitGroup.Add(1)

	for !tcpser.exitfg {
		cmdid, seqid, msg, err := RecvMessage(tcpser.cli, &tcpser.stat.RecvBytes)
		if nil == err {
			if handleCB, ok := tcpser.cmdMapping[cmdid]; ok {
				param := &ResponseHelper{cmdid, seqid, tcpser.sndQ, tcpser.mgr.resultCallBack}
				handleCB(cmdid, msg, param)
			}
		}
	}
}

// 发送协程
func (tcpser *TCPService) sendRoutine() {
	defer func() {
		tcpser.mgr.addSvrConn(tcpser, -1)
		if nil != tcpser.cli {
			tcpser.cli.Close()
			tcpser.cli = nil
		}
		tcpser.waitGroup.Done()
	}()

	tcpser.waitGroup.Add(1)

	for !(tcpser.exitfg) {
		msg, ok := <-tcpser.sndQ
		if !ok {
			break
		}

		var nSnd int
		if strBody, okStr := msg.Body.(string); okStr {
			nSnd = tcpser.sendByteArr(msg.Cmdid, msg.Seqid, []byte(strBody))
		} else if strByte, okByt := msg.Body.([]byte); okByt {
			nSnd = tcpser.sendByteArr(msg.Cmdid, msg.Seqid, strByte)
		} else if strMap, okMap := msg.Body.(map[string]interface{}); okMap {
			byteArr, err := json.Marshal(strMap)
			if nil != err {
				fmt.Println("JSON-MARSHAL-INVALID|", err, msg)
				continue
			}
			nSnd = tcpser.sendByteArr(msg.Cmdid, msg.Seqid, byteArr)
		} else {
			fmt.Println("INVALID-BYDY-TYPE|", reflect.TypeOf(msg.Body), msg.Body)
			continue
		}

		if nSnd < 1 {
			fmt.Println("SND-ERROR|", nSnd, "msg=", msg)
		}
	}
}

func (tcpser *TCPService) shutdown() {
	tcpser.exitfg = true
	close(tcpser.sndQ)
}

func (tcpser *TCPService) sendByteArr(cmdid uint16, seqid uint16, body []byte) (ret int) {
	ret, tcpser.err = tcpser.cli.Write(ToBytes(cmdid, seqid, body))
	if nil == tcpser.err {
		tcpser.stat.SendBytes += int64(ret)
		tcpser.stat.SendPkgn += 1
	}
	return
}
